{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Connect and harvest data from Twitter\n",
    "\n",
    "* Logging capability using `Python logging library` to collect any errors or warning in the case of program failure\n",
    "* Data Persistence capability using MongoDB with the `IO_Mongo` class as well as JSON file.\n",
    "* API rate limit and error management capability so we can ensure more resilient calls to Twitter without getting barred for tapping into the firehose."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Index([u'consumer_key', u' consumer_secret', u'access_token',\n",
       "       u'access_secret'],\n",
       "      dtype='object')"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import os, io, json\n",
    "import logging\n",
    "import twitter\n",
    "import urlparse\n",
    "import pandas as pd\n",
    "from pymongo import MongoClient as MCli\n",
    "\n",
    "# Load the twitter API keys\n",
    "twitter_tokens = pd.read_csv(\"../twitter_tokens.csv\")\n",
    "twitter_tokens.keys()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create JSON I/O"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "class IO_json(object):\n",
    "    \"\"\" Use pandas dataframe to create, insert, load json data.\"\"\"\n",
    "    def __init__(self, filepath, filename):\n",
    "        self.filepath = filepath\n",
    "        self.filename = filename\n",
    "        self.file_io = os.path.join(filepath, filename)\n",
    "        \n",
    "        \n",
    "    def save(self, data):\n",
    "        \"\"\" Save data as Pandas dataframe and convert to json. Check if file already exist...\"\"\"\n",
    "        if os.path.isfile('{0}/{1}.json'.format(self.filepath, self.filename)):\n",
    "            # If file exists, append data to the file\n",
    "            with io.open('{0}/{1}.json'.format(self.filepath, self.filename), 'a', encoding='utf-8') as f:\n",
    "                f.write(unicode(json.dumps(data, ensure_ascii=False)))\n",
    "                #                 f.write(json.dumps(data, ensure_ascii=False))           # No unicode in PYTHON 3\n",
    "        else:\n",
    "            # Create new file if .json file does not exist\n",
    "            with io.open('{0}/{1}.json'.format(self.filepath, self.filename), 'w', encoding='utf-8') as f:\n",
    "                f.write(unicode(json.dumps(data, ensure_ascii=False)))\n",
    "                #                 f.write(json.dumps(data, ensure_ascii=False))           # No unicode in PYTHON 3\n",
    "                \n",
    "                \n",
    "    \n",
    "    # Load method returns the file that has been read.\n",
    "    def load(self):\n",
    "        with io.open('{0}/{1}.json'.format(self.filepath, self.filename), encoding='utf-8') as f:\n",
    "            return f.read()\n",
    "            \n",
    "            "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Test the `IO_JSON` class"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 68,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Read sample JSON file\n",
    "sample_json = '''{\n",
    "\"Country\":\n",
    "    {\"0\":\"Algeria\",\"1\":\"Angola\",\"2\":\"Benin\",\"3\":\"Botswana\",\"4\":\"Burkina\"},\n",
    "\"Region\":\n",
    " {\"0\":\"AFRICA\",\"1\":\"AFRICA\",\"2\":\"AFRICA\",\"3\":\"AFRICA\",\"4\":\"AFRICA\"}\n",
    "}'''\n",
    "\n",
    "# Save the JSON file or create if not exist\n",
    "jsn  = IO_json('json_data', 'json_test_file')    \n",
    "jsn.save(sample_json)\n",
    "\n",
    "# Append to existing JSON file\n",
    "# jsn.load()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create MongoDB Connections"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 69,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "class IO_Mongo(object):\n",
    "    \"\"\"Connect to the mongo server on localhost at port 27017.\"\"\"\n",
    "    conn={'host':'localhost', 'ip':'27017'}\n",
    "\n",
    "\n",
    "    # Initialize the class with client connection, the database (i.e. twtr_db), and the collection (i.e. twtr_coll)\n",
    "    def __init__(self, db='twtr_db', coll='twtr_coll', **conn):\n",
    "        \"\"\"Connect to the MonfoDB server\"\"\"\n",
    "        self.client = MCli(**conn)\n",
    "        self.db = self.client[db]\n",
    "        self.coll = self.db[coll]\n",
    "\n",
    "\n",
    "    # The `save` method inserts new records in the pre_initialized collection and database\n",
    "    def save(self, data):\n",
    "        \"\"\" Insert data to collection in db. \"\"\"\n",
    "        return self.coll.insert(data)\n",
    "    \n",
    "    \n",
    "    # The `load` method allows the retrieval of specific records\n",
    "    def load(self, return_cursor=False, criteria=None, projection=None):\n",
    "        \"\"\" The `load` method allows the retrieval of specific records according to criteria and projection. \n",
    "            In case of large amount of data, it returns a cursor.\n",
    "        \"\"\"\n",
    "        if criteria is None:\n",
    "            criteria = {}\n",
    "        \n",
    "        # Find record according to some criteria.\n",
    "        if projection is None:\n",
    "            cursor = self.coll.find(criteria)\n",
    "        else:\n",
    "            cursor = self.coll.find(criteria, projection)\n",
    "        \n",
    "        # Return a cursor for large amount of data\n",
    "        if return_cursor:\n",
    "            return cursor\n",
    "        else:\n",
    "            return [item for item in cursor]\n",
    "        \n",
    "        "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Initialize by instantiating the `Twitter API` with our credentials"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 78,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "\n",
    "class TwitterAPI(object):\n",
    "    \"\"\"\n",
    "        TwitterAPI class allows the Connection to Twitter via OAuth\n",
    "        once you have registered with Twitter and receive the\n",
    "        necessary credentials.\n",
    "    \"\"\"\n",
    "    # Initialize key variables and get the twitter credentials\n",
    "    def __init__(self):\n",
    "        consumer_key = twitter_tokens.values.flatten()[0]\n",
    "        consumer_secret = twitter_tokens.values.flatten()[1]\n",
    "        access_token = twitter_tokens.values.flatten()[2]\n",
    "        access_secret = twitter_tokens.values.flatten()[3]\n",
    "        \n",
    "        self.consumer_key = consumer_key\n",
    "        self.consumer_secret = consumer_secret\n",
    "        self.access_token = access_token\n",
    "        self.access_secret = access_secret\n",
    "    \n",
    "    # Set number of retries\n",
    "        self.retries = 3\n",
    "        \n",
    "    # Authenticate credentials with Twitter using OAuth\n",
    "        self.auth = twitter.oauth.OAuth(access_token, access_secret, consumer_key, consumer_secret)\n",
    "        \n",
    "    # Create registered Twitter API\n",
    "        self.api = twitter.Twitter(auth=self.auth)\n",
    "        \n",
    "# Initialize the Logger by providing the log level\n",
    "# logger.debug(debug message), logger.info(info message), \n",
    "# logger.warn(warn message), logger.critical(critical message)\n",
    "\n",
    "    # Logger initialization\n",
    "        appName = 'twt150530'\n",
    "        self.logger = logging.getLogger(appName)\n",
    "        \n",
    "        # self.logger.setLevel(logging.DEBUG)\n",
    "        # create console handler and set level to debug\n",
    "        logPath = './log_data'\n",
    "        fileName = appName\n",
    "        fileHandler = logging.FileHandler(\"{0}/{1}.log\".format(logPath, fileName))\n",
    "        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')\n",
    "        fileHandler.setFormatter(formatter)\n",
    "        self.logger.addHandler(fileHandler)\n",
    "        self.logger.setLevel(logging.DEBUG)\n",
    "        \n",
    "        \n",
    "    # initialize the JSON file persistence isntruction\n",
    "        jsonF_path = './json_data'\n",
    "        jsonF_name = 'twt15053001'\n",
    "        self.jsonSaver = IO_json(jsonF_path, jsonF_name)\n",
    "        \n",
    "    \n",
    "    # Initialize the MongoDB database and collections for persistence\n",
    "        self.mongoSaver = IO_Mongo(db='twtr01_db', coll='twtr01_coll')\n",
    "        \n",
    "    \n",
    "        \n",
    "    # Search Twitter with query q (i.e \"ApacheSpark\") and max result\n",
    "    def searchTwitter(self, q, max_res=10, **kwargs):\n",
    "        search_results = self.api.search.tweets(q=q, count=10, **kwargs)\n",
    "        statuses = search_results['statuses']\n",
    "        max_results = min(1000, max_res)\n",
    "        \n",
    "        for _ in range(10):\n",
    "            try:\n",
    "                next_results = search_results['search_metadata']['next_results']\n",
    "            except KeyError as e:\n",
    "# Add new logging update\n",
    "                self.logger.error(\"Error in searchTwitter: {}\".format(e))\n",
    "                break\n",
    "            \n",
    "            next_results = urlparse.parse_qsl(next_results[1:])\n",
    "            kwargs = dict(next_results)\n",
    "            \n",
    "            search_results = self.api.search.tweets(**kwargs)\n",
    "            statuses += search_results['statuses']\n",
    "            self.saveTweets(search_results['statuses'])\n",
    "            \n",
    "            if len(statuses) > max_results:\n",
    "                self.logger.info('info in searchTwitter - got {} tweets - max: {}'.format(len(statuses), max_results))\n",
    "                break\n",
    "            \n",
    "        return statuses\n",
    "    \n",
    "    \n",
    "#   The saveTweets method actually saves the collected tweets in JSON and in MongoDB:\n",
    "    def saveTweets(self, statuses):\n",
    "        # Saving to JSON File\n",
    "        self.jsonSaver.save(statuses)\n",
    "        \n",
    "        # Saving to MongoDB\n",
    "        for s in statuses:\n",
    "            self.mongoSaver.save(s)\n",
    "    \n",
    "    \n",
    "    # Parse tweets as it is collected to extract ID, creation date, userID, tweet text\n",
    "    def parseTweets(self, statuses):\n",
    "        tweetx = [(status['id'],\n",
    "                   status['created_at'],\n",
    "                   status['user']['id'],\n",
    "                   status['user']['name'],\n",
    "                   status['text']['text'],\n",
    "                   url['expanded_url']) \n",
    "                    for status in statuses \n",
    "                      for url in status['entities']['urls']\n",
    "                 ]\n",
    "        return tweetx\n",
    "    \n",
    "    \n",
    "    \n",
    "    # The getTweets method calls the searchTwitter method described previously. \n",
    "    # The getTweets method ensures that API calls are made reliably whilst respecting the imposed rate limit. \n",
    "    def getTweets(self, q, max_res=10):\n",
    "        pass\n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 79,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>0</th>\n",
       "      <th>1</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>a</td>\n",
       "      <td>1</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>b</td>\n",
       "      <td>5</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>c</td>\n",
       "      <td>3</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   0  1\n",
       "0  a  1\n",
       "1  b  5\n",
       "2  c  3"
      ]
     },
     "execution_count": 79,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "pd.DataFrame([('a',1), ('b',5), ('c',3)])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.11"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
